C++线程池实现

主要思想

让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。

这个循环该什么时候停止呢?

很简单,当线程池停止使用时,循环停止。

这样一来,就保证了thread函数的唯一性,而且复用线程执行task。

总结一下,我们的线程池的主要组成部分有二:

  • 任务队列(Task Queue)
  • 线程池(BMP)

线程池与任务队列之间的匹配操作,是典型的生产者-消费者模型,本模型使用了两个工具:一个mutex + 一个conditional_variablemutex就是锁,保证任务的添加和移除(获取)的互斥性;一个条件变量保证多个线程获取task的同步性:当任务队列为空时,线程应该等待(阻塞)。

BMP.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <queue>
#include <mutex>
#include <functional>
#include <thread>
#include <condition_variable>
#include <vector>
#include <future>

template <typename T>
class safeQueue // 线程安全队列类
{
private:
std::queue<T> _m_queue; // 线程队列
std::mutex _m_lock; // 线程锁

public:
bool empty()
{
std::lock_guard<std::mutex> en_lock(_m_lock); // 对函数直接加互斥锁,lock_guard自动回收
return _m_queue.empty(); // 返回队列是否为空
}
void push(T value)
{
std::lock_guard<std::mutex> en_lock(_m_lock);
_m_queue.push(value); // 压入队列
}
bool pop(T &t)
{
std::lock_guard<std::mutex> en_lock(_m_lock);
if (_m_queue.empty()) // 如果队列为空,返回false
{
return false;
}
t = std::move(_m_queue.front()); // 取出队列中的第一个元素
_m_queue.pop(); // 弹出队列中的第一个元素
return true;
}
int size()
{
std::lock_guard<std::mutex> en_lock(_m_lock);
return _m_queue.size(); // 返回队列大小
}
};

class BMP // 线程池类
{
private:
class worker // 工作线程类
{
private:
uint32_t _worker_id; // 工作线程id
BMP *_m_pool; // 线程池指针

public:
worker(const uint32_t id, BMP *pool) : _worker_id(id), _m_pool(pool) {}
void operator()() // 重载操作符
{
std::function<void()> func; // 基础函数
bool dequeued;
while (!_m_pool->_m_shutdown) // 如果线程池没有关闭
{
// 取出任务,唤醒进程,执行工作函数
{
// 为线程环境加锁,互访问工作线程的休眠和唤醒
std::unique_lock<std::mutex> lock(_m_pool->_m_conditional_mutex);

// 如果队列为空,线程休眠
if (_m_pool->_m_queue.empty())
{
_m_pool->_m_conditional_lock.wait(lock);
}
// 取出队列中的任务
dequeued = _m_pool->_m_queue.pop(func);
}

// 如果成功取出,执行工作函数
if (dequeued)
{
func();
}
}
}
};
bool _m_shutdown; // 线程池是否关闭
safeQueue<std::function<void()>> _m_queue; // 执行函数安全队列
std::vector<std::thread> _m_threads; // 线程队列
std::mutex _m_conditional_mutex; // 线程休眠锁互斥量
std::condition_variable _m_conditional_lock; // 线程环境锁,让线程处于休眠或者唤醒状态

public:
BMP(const int n_threads = 2) : _m_threads(std::vector<std::thread>(n_threads)), _m_shutdown(false) {}

BMP(const BMP &) = delete; // 删除拷贝构造函数
BMP(BMP &&) = delete; // 删除移动构造函数
BMP &operator=(const BMP &) = delete; // 删除拷贝赋值操作符
BMP &operator=(BMP &&) = delete; // 删除移动赋值操作符

void init()
{
for (uint32_t i = 0; i < _m_threads.size(); i++)
{
// 初始化工作线程
_m_threads.at(i) = std::thread(worker(i, this));
}
}

void shutdown()
{
_m_shutdown = true;
_m_conditional_lock.notify_all(); // 通知唤醒所有工作进程

for (uint32_t i = 0; i < _m_threads.size(); i++) // 等待所有线程完成
{
if (_m_threads[i].joinable()) // 判断线程是否在等待
{
_m_threads[i].join(); // 等待当前线程完成
}
}
}

// 任务提交函数
template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{
// 知道这是函数绑定和参数转发,但语法太难了。。。
// 用std::function包装函数,std::bind绑定参数,std::forward转发参数
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// 封装函数指针
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

std::function<void()> wrapper_func = [task_ptr]()
{
(*task_ptr)();
};

// 队列通用安全封包函数,并压入安全队列
_m_queue.push(wrapper_func);

// 唤醒一个等待中的线程
_m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
};

#endif

test.cpp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// test.cpp

#include <iostream>
#include <random>
#include "BMP.h"
std::random_device rd; // 真实随机数产生器

std::mt19937 mt(rd()); //生成计算随机数mt

std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数

auto rnd = std::bind(dist, mt);

// 设置线程睡眠时间
void simulate_hard_computation()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}

// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}

void example()
{
// 创建3个线程的线程池
BMP pool(3);

// 初始化线程池
pool.init();

// 提交乘法操作,总共30个
for (int i = 1; i <= 3; ++i)
for (int j = 1; j <= 10; ++j)
{
pool.submit(multiply, i, j);
}

// 使用ref传递的输出参数提交函数
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

// 等待乘法输出完成
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;

// 使用return参数提交函数
auto future2 = pool.submit(multiply_return, 5, 3);

// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;

// 关闭线程池
pool.shutdown();
}

int main()
{
example();

return 0;
}

执行结果

image-20240316225555482

参考

基于C++11实现线程池 - 知乎 (zhihu.com)


桂ICP备2024024328号